# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.producer.ProducerConfig for more details

############################# Producer Basics #############################

# list of brokers used for bootstrapping knowledge about the rest of the cluster
# format: host1:port1,host2:port2 ...
metadata.broker.list={{ kafka_connection_string }}

{% if kafka_producer %}

{% if kafka_producer.request_required_acks %}
# This value controls when a produce request is considered completed.
# Specifically, how many other brokers must have committed the data to their log
# and acknowledged this to the leader? Typical values are:
#
# 0, which means that the producer never waits for an acknowledgement from the
#    broker (the same behavior as 0.7). This option provides the lowest
#    latency but the weakest durability guarantees (some data will be lost
#    when a server fails).
# 1, which means that the producer gets an acknowledgement after the leader
#    replica has received the data. This option provides better durability as
#    the client waits until the server acknowledges the request as successful
#    (only messages that were written to the now-dead leader but not yet
#    replicated will be lost).
# -1, The producer gets an acknowledgement after all in-sync replicas have
#    received the data. This option provides the greatest level of durability.
#    However, it does not completely eliminate the risk of message loss
#    because the number of in sync replicas may, in rare cases, shrink to 1.
#    If you want to ensure that some minimum number of replicas (typically a
#    majority) receive a write, then you must set the topic-level
#    min.insync.replicas setting. Please read the Replication section of the
#    design documentation for a more in-depth discussion.
request.required.acks={{ kafka_producer.request_required_acks }}
{% endif %}

{% if kafka_producer.request_timeout_ms %}
# The amount of time the broker will wait trying to meet the
# request.required.acks requirement before sending back an error to the client.
request.timeout.ms={{ kafka_producer.request_timeout_ms }}
{% endif %}

{% if kafka_producer.type %}
# specifies whether the messages are sent asynchronously (async) or synchronously (sync)
producer.type={{ kafka_producer.type }}
{% endif %}

{% if kafka_producer.serializer_class %}
# message encoder
serializer.class={{ kafka_producer.serializer_class }}
{% endif %}

{% if kafka_producer.key_serializer_class %}
# The serializer class for keys (defaults to the same as for messages if nothing
# is given).
key.serializer.class={{ kafka_producer.key_serializer_class }}
{% endif %}

{% if kafka_producer.partitioner_class %}
# name of the partitioner class for partitioning events; default partition spreads data randomly
partitioner.class={{ kafka_producer.partitioner_class }}
{% endif %}

{% if kafka_producer.compression_codec %}
# specify the compression codec for all data generated: none, gzip, snappy, lz4.
# the old config values work as well: 0, 1, 2, 3 for none, gzip, snappy, lz4, respectively
compression.codec={{ kafka_producer.compression_codec }}
{% endif %}

{% if kafka_producer.compressed_topics %}
# allow topic level compression
compressed.topics={{ kafka_producer.compressed_topics }}
{% endif %}

{% if kafka_producer.compressed_topics %}
# This property will cause the producer to automatically retry a failed send
# request. This property specifies the number of retries when such failures
# occur. Note that setting a non-zero value here can lead to duplicates in the
# case of network errors that cause a message to be sent but the acknowledgement
# to be lost.
message.send.max.retries={{ kafka_producer.message_send_max_retries }}
{% endif %}

{% if kafka_producer.retry_backoff_ms %}
# Before each retry, the producer refreshes the metadata of relevant topics to
# see if a new leader has been elected. Since leader election takes a bit of
# time, this property specifies the amount of time that the producer waits
# before refreshing the metadata.
retry.backoff.ms={{ kafka_producer.retry_backoff_ms }}
{% endif %}

{% if kafka_producer.topic_metadata_refresh_interval_ms %}
# The producer generally refreshes the topic metadata from brokers when there is
# a failure (partition missing, leader not available...). It will also poll
# regularly (default: every 10min so 600000ms). If you set this to a negative
# value, metadata will only get refreshed on failure. If you set this to zero,
# the metadata will get refreshed after each message sent (not recommended).
# Important note: the refresh happen only AFTER the message is sent, so if the
# producer never sends a message the metadata is never refreshed
topic.metadata.refresh.interval.ms={{ kafka_producer.topic_metadata_refresh_interval_ms }}
{% endif %}

{% if kafka_producer.send_buffer_bytes %}
# Socket write buffer size
send.buffer.bytes={{ prodcuer.send_buffer_bytes }}
{% endif %}

############################# Async Producer #############################
{% if kafka_producer.queue_buffering_max_ms %}
# maximum time, in milliseconds, for buffering data on the producer queue
queue.buffering.max.ms={{ kafka_producer.queue_buffering_max_ms }}
{% endif %}

{% if kafka_producer.queue_buffering_max_messages %}
# the maximum size of the blocking queue for buffering on the producer
queue.buffering.max.messages={{ kafka_producer.queue_buffering_max_messages }}
{% endif %}

{% if kafka_producer.queue_enqueue_timeout_ms %}
# Timeout for event enqueue:
# 0: events will be enqueued immediately or dropped if the queue is full
# -ve: enqueue will block indefinitely if the queue is full
# +ve: enqueue will block up to this many milliseconds if the queue is full
queue.enqueue.timeout.ms={{ kafka_producer.queue_enqueue_timeout_ms }}
{% endif %}

{% if kafka_producer.batch_num_messages %}
# the number of messages batched at the producer
batch.num.messages={{ kafka_producer.batch_num_messages }}
{% endif %}

{% endif %}
